dubbo provider端接收request与发送response

您所在的位置:网站首页 dubbo client can not supported dubbo provider端接收request与发送response

dubbo provider端接收request与发送response

2023-08-20 23:53| 来源: 网络整理| 查看: 265

Dubbo接收request的过程其实和client端接收response的过程有点像。建议先阅读上一篇文章。 从NettyHandler开始受到tcp请求,再一直传递到HeaderExchangeHandler的过程和client端接收response过程基本一样,不再详细描述.

@Override public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { //处理request请求 handleRequest(exchangeChannel, request); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { //处理response handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); //暂时不理解这个Broken是什么意思 if (req.isBroken()) { Object data = req.getData(); String msg; if (data == null) msg = null; else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data); else msg = data.toString(); res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); channel.send(res); return; } // find handler by message class. Object msg = req.getData(); try { // handle data. //代码执行到这里,就是开始调用我们实现的业务代码去获取数据 //这个handler就是DubboProtocol里的handler对象,调用其reply()方法执行,通过放射执行业务代码获取数据 CompletableFuture future = handler.reply(channel, msg); if (future.isDone()) { res.setStatus(Response.OK); res.setResult(future.get()); channel.send(res); return; } future.whenComplete((result, t) -> { try { if (t == null) { res.setStatus(Response.OK); res.setResult(result); } else { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } //把拿到的response写出去 channel.send(res); } catch (RemotingException e) { logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e); } finally { // HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } }

当拿到response之后就会把response写出去,这里的channel对象就是HeaderExchangeChannel,后面会转发NettyChannel对象把数据写出去.

下面看下DubboProtocol里的handler的reply实现:

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public CompletableFuture reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker invoker = getInvoker(channel, inv); // need to consider backward-compatibility if it's a callback if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || !methodsStr.contains(",")) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext rpcContext = RpcContext.getContext(); boolean supportServerAsync = invoker.getUrl().getMethodParameter(inv.getMethodName(), Constants.ASYNC_KEY, false); if (supportServerAsync) { CompletableFuture future = new CompletableFuture(); rpcContext.setAsyncContext(new AsyncContextImpl(future)); } rpcContext.setRemoteAddress(channel.getRemoteAddress()); //获取Invoker对象执行invoke //这里会把filter执行一遍然后拿到结果 Result result = invoker.invoke(inv); if (result instanceof AsyncRpcResult) { return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r); } else { return CompletableFuture.completedFuture(result); } } throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } ...省略 };


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3